Example analysis of Python custom process pool [producer and consumer model problems]

  • 2020-05-12 02:48:33
  • OfStack

This article analyzes the custom Python process pool as an example. I will share it with you for your reference as follows:

Code description 1 cut:


#encoding=utf-8
#author: walker
#date: 2014-05-21
#function:  Custom process pool traverses files under directory 
from multiprocessing import Process, Queue, Lock
import time, os
# consumers 
class Consumer(Process):
  def __init__(self, queue, ioLock):
    super(Consumer, self).__init__()
    self.queue = queue
    self.ioLock = ioLock
  def run(self):
    while True:
      task = self.queue.get()  # When there are no tasks in the queue, the process is blocked 
      if isinstance(task, str) and task == 'quit':
        break;
      time.sleep(1)  # Assume that task handling is required 1 seconds 
      self.ioLock.acquire()
      print( str(os.getpid()) + ' ' + task)
      self.ioLock.release()
    self.ioLock.acquire()
    print 'Bye-bye'
    self.ioLock.release()
# producers 
def Producer():
  queue = Queue()  # This queue is the process / thread-safe 
  ioLock = Lock()
  subNum = 4  # Number of child processes 
  workers = build_worker_pool(queue, ioLock, subNum)
  start_time = time.time()
  for parent, dirnames, filenames in os.walk(r'D:\test'):
    for filename in filenames:
      queue.put(filename)
      ioLock.acquire()
      print('qsize:' + str(queue.qsize()))
      ioLock.release()
      while queue.qsize() > subNum * 10: # Controls the number of tasks in the queue 
        time.sleep(1)
  for worker in workers:
    queue.put('quit')
  for worker in workers:
    worker.join()
  ioLock.acquire()
  print('Done! Time taken: {}'.format(time.time() - start_time))
  ioLock.release()
# Create process pool 
def build_worker_pool(queue, ioLock, size):
  workers = []
  for _ in range(size):
    worker = Consumer(queue, ioLock)
    worker.start()
    workers.append(worker)
  return workers
if __name__ == '__main__':
  Producer()

ps:


self.ioLock.acquire()
...
self.ioLock.release()

Available:


with self.ioLock:
  ...

Alternative.

Here's another fun example:


#encoding=utf-8
#author: walker
#date: 2016-01-06
#function: 1 Interesting examples of multiple processes 
import os, sys, time
from multiprocessing import Pool
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
g_List = ['a']
# Modify global variables g_List
def ModifyDict_1():
  global g_List
  g_List.append('b')
# Modify global variables g_List
def ModifyDict_2():
  global g_List
  g_List.append('c')
# To deal with 1 a 
def ProcOne(num):
  print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))
# Handle all 
def ProcAll():
  pool = Pool(processes = 4)
  for i in range(1, 20):
    #ProcOne(i)
    #pool.apply(ProcOne, (i,))
    pool.apply_async(ProcOne, (i,))
  pool.close()
  pool.join()
ModifyDict_1() # Modify global variables g_List
if __name__ == '__main__':
  ModifyDict_2() # Modify global variables g_List
  print('In main g_List :' + repr(g_List))
  ProcAll()

The result of Windows7:


 Lambda.  python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Results of Ubuntu 14.04:


In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

You can see that the second modification of Windows7 was not successful, while the modification of Ubuntu was successful. According to uliweb author limodou, the reason is that Windows is a subprocess that restarts the implementation; Linux is implemented by fork.

More about Python related topics: interested readers to view this site "Python URL skills summary", "Python pictures skills summary", "Python data structure and algorithm tutorial", "Python Socket programming skills summary", "Python function using techniques", "Python string skills summary", "Python introduction and advanced tutorial" and "Python file and directory skills summary"

I hope this article is helpful to you Python programming.


Related articles: